{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start the beam-flink job server"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from hops import beam as hops_beam\n",
    "# Start Beam jobservice\n",
    "hops_beam.start(taskmanager_heap_size=8192)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "MV-CWK2kI-iH"
   },
   "source": [
    "## TFMA Notebook example\n",
    "\n",
    "This notebook describes how to export your model for TFMA and demonstrates the analysis tooling it offers.\n",
    "\n",
    "Note: Please make sure to follow the instructions in [README.md](https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi/README.md) when running this notebook"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "MV-CWK2kI-iH"
   },
   "source": [
    "## Setup\n",
    "\n",
    "Import necessary packages."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "xFbGgkXAJCJ7"
   },
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "from hops import beam as hops_beam\n",
    "import os\n",
    "from tfx.examples.chicago_taxi import preprocess\n",
    "import shutil\n",
    "import tensorflow as tf\n",
    "import tensorflow_data_validation as tfdv\n",
    "import tensorflow_model_analysis as tfma\n",
    "from google.protobuf import text_format \n",
    "from tensorflow.python.lib.io import file_io\n",
    "from tensorflow_transform.beam.tft_beam_io import transform_fn_io\n",
    "from tensorflow_transform.coders import example_proto_coder\n",
    "from tensorflow_transform.saved import saved_transform_io\n",
    "from tensorflow_transform.tf_metadata import dataset_schema\n",
    "from tensorflow_transform.tf_metadata import schema_utils\n",
    "from tfx.examples.chicago_taxi.trainer import task\n",
    "from tfx.examples.chicago_taxi.trainer import taxi\n",
    "from hops import hdfs as hopsfs\n",
    "from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, SetupOptions, HadoopFileSystemOptions, PortableOptions, WorkerOptions, DebugOptions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Download input dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests, zipfile\n",
    "from io import BytesIO\n",
    "r = requests.get(\"http://snurran.sics.se/hops/beam/chicago_taxi_data.zip\", stream=True)\n",
    "z = zipfile.ZipFile(BytesIO(r.content))\n",
    "z.extractall()\n",
    "\n",
    "# Copy data into resources\n",
    "hopsfs.copy_to_hdfs('data', 'Resources', overwrite=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "zpCt_emiJDeb"
   },
   "source": [
    "Helper functions and some constants for running the notebook locally."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "1Axm8YxCJF7K"
   },
   "outputs": [],
   "source": [
    "BASE_DIR = hopsfs.project_path(exclude_nn_addr=True)\n",
    "DATA_DIR = os.path.join(BASE_DIR, 'Resources/data')\n",
    "OUTPUT_DIR = os.path.join(BASE_DIR, 'Resources/taxi_out_local')\n",
    "TMP_DIR = os.path.join(BASE_DIR, 'Resources/taxi_tmp_local')\n",
    "\n",
    "# Base dir containing train and eval data\n",
    "TRAIN_DATA_DIR = os.path.join(DATA_DIR, 'train')\n",
    "EVAL_DATA_DIR = os.path.join(DATA_DIR, 'eval')\n",
    "\n",
    "# Base dir where TFT writes training data\n",
    "TFT_TRAIN_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tft_train')\n",
    "TFT_TRAIN_FILE_PREFIX = 'train_transformed'\n",
    "\n",
    "# Base dir where TFT writes eval data\n",
    "TFT_EVAL_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tft_eval')\n",
    "TFT_EVAL_FILE_PREFIX = 'eval_transformed'\n",
    "\n",
    "TF_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tf')\n",
    "\n",
    "# Base dir where TFMA writes eval data\n",
    "TFMA_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tfma')\n",
    "\n",
    "SERVING_MODEL_DIR = 'serving_model_dir'\n",
    "EVAL_MODEL_DIR = 'eval_model_dir'\n",
    "\n",
    "\n",
    "def get_tft_train_output_dir(run_id):\n",
    "    return _get_output_dir(TFT_TRAIN_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "\n",
    "def get_tft_eval_output_dir(run_id):\n",
    "    return _get_output_dir(TFT_EVAL_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "\n",
    "def get_tf_output_dir(run_id):\n",
    "    return _get_output_dir(TF_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "def get_tfma_output_dir(run_id):\n",
    "    return _get_output_dir(TFMA_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "def _get_output_dir(base_dir, run_id):\n",
    "    return os.path.join(base_dir, 'run_' + str(run_id))\n",
    "\n",
    "def get_schema_file():\n",
    "    return os.path.join(OUTPUT_DIR, 'schema.pbtxt')\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "YDaWlFehJH7r"
   },
   "source": [
    "Clean up output directories."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "E7zx4bAOJKZN"
   },
   "outputs": [],
   "source": [
    "if hopsfs.exists(TMP_DIR):\n",
    "    hopsfs.rmr(TMP_DIR)\n",
    "if hopsfs.exists(OUTPUT_DIR):\n",
    "    hopsfs.rmr(OUTPUT_DIR)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Configure the beam pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_args = hops_beam.get_portable_runner_config()\n",
    "options=PipelineOptions(flags=pipeline_args)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "ZXK_1T-JJL9s"
   },
   "source": [
    "## Compute and visualize descriptive data statistics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "hXYNl387JOfp"
   },
   "outputs": [],
   "source": [
    "# Compute stats over training data.\n",
    "train_stats = tfdv.generate_statistics_from_csv(data_location=os.path.join(TRAIN_DATA_DIR, 'data.csv'), output_path=os.path.join(TMP_DIR, 'out', 'train_stats'), pipeline_options=options)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "baGGqXSaJQeT"
   },
   "outputs": [],
   "source": [
    "# Visualize training data stats.\n",
    "tfdv.visualize_statistics(train_stats)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "AsOJi9U3JR35"
   },
   "source": [
    "## Infer a schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "JSUFwSKoJTtT"
   },
   "outputs": [],
   "source": [
    "# Infer a schema from the training data stats.\n",
    "schema = tfdv.infer_schema(statistics=train_stats, infer_feature_shape=False)\n",
    "tfdv.display_schema(schema=schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "kzLwrYqEJWcD"
   },
   "source": [
    "## Check evaluation data for errors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "NVA5Bor4JaxL"
   },
   "outputs": [],
   "source": [
    "# Compute stats over eval data.\n",
    "eval_stats = tfdv.generate_statistics_from_csv(data_location=os.path.join(EVAL_DATA_DIR, 'data.csv'), output_path=os.path.join(TMP_DIR, 'out', 'eval_stats'), pipeline_options=options)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "NvxUKVMJJcSk"
   },
   "outputs": [],
   "source": [
    "# Compare stats of eval data with training data.\n",
    "tfdv.visualize_statistics(lhs_statistics=eval_stats, rhs_statistics=train_stats,\n",
    "                          lhs_name='EVAL_DATASET', rhs_name='TRAIN_DATASET')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "5QqFQ29tJhEi"
   },
   "outputs": [],
   "source": [
    "# Check eval data for errors by validating the eval data stats using the previously inferred schema.\n",
    "anomalies = tfdv.validate_statistics(statistics=eval_stats, schema=schema)\n",
    "tfdv.display_anomalies(anomalies)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "MZ_xsJgUJiGL"
   },
   "outputs": [],
   "source": [
    "# Update the schema based on the observed anomalies.\n",
    "\n",
    "# Relax the minimum fraction of values that must come from the domain for feature company.\n",
    "company = tfdv.get_feature(schema, 'company')\n",
    "company.distribution_constraints.min_domain_mass = 0.9\n",
    "\n",
    "# Add new value to the domain of feature payment_type.\n",
    "payment_type_domain = tfdv.get_domain(schema, 'payment_type')\n",
    "payment_type_domain.value.append('Prcard')\n",
    "\n",
    "# Validate eval stats after updating the schema \n",
    "updated_anomalies = tfdv.validate_statistics(eval_stats, schema)\n",
    "tfdv.display_anomalies(updated_anomalies)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "gR81RJidJlJT"
   },
   "source": [
    "## Freeze the schema\n",
    "\n",
    "Now that the schema has been reviewed and curated, we will store it in a file to reflect its \"frozen\" state."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "MBC7_QnbJmar"
   },
   "outputs": [],
   "source": [
    "file_io.recursive_create_dir(OUTPUT_DIR)\n",
    "file_io.write_string_to_file(get_schema_file(), text_format.MessageToString(schema))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "q1aDkWP3Jn9a"
   },
   "source": [
    "## Preprocess Inputs\n",
    "\n",
    "transform_data is defined in preprocess.py and uses the tensorflow_transform library to perform preprocessing. The same code is used for both local preprocessing in this notebook and preprocessing in the Cloud (via Dataflow)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "7i00wLcIJqEb"
   },
   "outputs": [],
   "source": [
    "# Transform eval data\n",
    "preprocess.transform_data(input_handle=os.path.join(EVAL_DATA_DIR, 'data.csv'),\n",
    "                          outfile_prefix=TFT_EVAL_FILE_PREFIX, \n",
    "                          working_dir=str(get_tft_eval_output_dir(0)),\n",
    "                          schema_file=get_schema_file(),\n",
    "                          pipeline_args=pipeline_args)\n",
    "print('Done')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "8Onber05Jr46"
   },
   "outputs": [],
   "source": [
    "# Transform training data\n",
    "preprocess.transform_data(input_handle=os.path.join(TRAIN_DATA_DIR, 'data.csv'),\n",
    "                          outfile_prefix=TFT_TRAIN_FILE_PREFIX, \n",
    "                          working_dir=str(get_tft_train_output_dir(0)),\n",
    "                          schema_file=get_schema_file(),\n",
    "                          pipeline_args=pipeline_args)\n",
    "print('Done')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "0LUjgM3AJtwj"
   },
   "source": [
    "## Compute statistics over transformed data "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "Kzks-t0sJxOL"
   },
   "outputs": [],
   "source": [
    "# Compute stats over transformed training data.\n",
    "TRANSFORMED_TRAIN_DATA = os.path.join(get_tft_train_output_dir(0), TFT_TRAIN_FILE_PREFIX + \"*\") \n",
    "transformed_train_stats = tfdv.generate_statistics_from_tfrecord(data_location=TRANSFORMED_TRAIN_DATA, output_path=os.path.join(TMP_DIR, 'out' ,'transformed_train_stats'), pipeline_options=options)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "w83Kd3fXJyga"
   },
   "outputs": [],
   "source": [
    "# Visualize transformed training data stats and compare to raw training data. \n",
    "# Use 'Feature search' to focus on a feature and see statistics pre- and post-transformation.\n",
    "tfdv.visualize_statistics(transformed_train_stats, train_stats, lhs_name='TRANSFORMED', rhs_name='RAW')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "j-SN2NaKJ1Wq"
   },
   "source": [
    "## Prepare the Model\n",
    "\n",
    "To use TFMA, export the model into an **EvalSavedModel** by calling ``tfma.export.export_eval_savedmodel``.\n",
    "\n",
    "``tfma.export.export_eval_savedmodel`` is analogous to ``estimator.export_savedmodel`` but exports the evaluation graph as opposed to the training or inference graph. Notice that one of the inputs is ``eval_input_receiver_fn`` which is analogous to ``serving_input_receiver_fn`` for ``estimator.export_savedmodel``. For more details, refer to the documentation for TFMA on Github.\n",
    "\n",
    "Contruct the **EvalSavedModel** after training is completed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "a35ueyufJ2bj"
   },
   "outputs": [],
   "source": [
    "def run_experiment(hparams):\n",
    "    \"\"\"Run the training and evaluate using the high level API\"\"\"\n",
    "\n",
    "    # Train and evaluate the model as usual.\n",
    "    estimator = task.train_and_maybe_evaluate(hparams)\n",
    "\n",
    "    # Export TFMA's sepcial EvalSavedModel\n",
    "    eval_model_dir = os.path.join(hparams.output_dir, EVAL_MODEL_DIR)\n",
    "    receiver_fn = lambda: eval_input_receiver_fn(hparams.tf_transform_dir)\n",
    "\n",
    "    tfma.export.export_eval_savedmodel(\n",
    "        estimator=estimator,\n",
    "        export_dir_base=eval_model_dir,\n",
    "        eval_input_receiver_fn=receiver_fn)\n",
    "    \n",
    "def eval_input_receiver_fn(working_dir):\n",
    "    # Extract feature spec from the schema.\n",
    "    raw_feature_spec = schema_utils.schema_as_feature_spec(schema).feature_spec\n",
    "\n",
    "    serialized_tf_example = tf.placeholder(\n",
    "        dtype=tf.string, shape=[None], name='input_example_tensor')\n",
    "\n",
    "    # First we deserialize our examples using the raw schema.\n",
    "    features = tf.parse_example(serialized_tf_example, raw_feature_spec)\n",
    "\n",
    "    # Now that we have our raw examples, we must process them through tft\n",
    "    _, transformed_features = (\n",
    "        saved_transform_io.partially_apply_saved_transform(\n",
    "            os.path.join(working_dir, transform_fn_io.TRANSFORM_FN_DIR),\n",
    "            features))\n",
    "\n",
    "    # The key MUST be 'examples'.\n",
    "    receiver_tensors = {'examples': serialized_tf_example}\n",
    "    \n",
    "    # NOTE: Model is driven by transformed features (since training works on the\n",
    "    # materialized output of TFT, but slicing will happen on raw features.\n",
    "    features.update(transformed_features)\n",
    "    \n",
    "    return tfma.export.EvalInputReceiver(\n",
    "        features=features,\n",
    "        receiver_tensors=receiver_tensors,\n",
    "        labels=transformed_features[taxi.transformed_name(taxi.LABEL_KEY)])\n",
    "\n",
    "print('Done')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "wZV47rcxJ6aC"
   },
   "source": [
    "## Train and export the model for TFMA"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "wXlW87u8J8vb"
   },
   "outputs": [],
   "source": [
    "def run_local_experiment(tft_run_id, tf_run_id, num_layers, first_layer_size, scale_factor):\n",
    "    \"\"\"Helper method to train and export the model for TFMA\n",
    "    \n",
    "    The caller specifies the input and output directory by providing run ids. The optional parameters\n",
    "    allows the user to change the modelfor time series view.\n",
    "    \n",
    "    Args:\n",
    "      tft_run_id: The run id for the preprocessing. Identifies the folder containing training data.\n",
    "      tf_run_id: The run for this training run. Identify where the exported model will be written to.\n",
    "      num_layers: The number of layers used by the hiden layer.\n",
    "      first_layer_size: The size of the first hidden layer.\n",
    "      scale_factor: The scale factor between each layer in in hidden layers.\n",
    "    \"\"\"\n",
    "    hparams = tf.contrib.training.HParams(\n",
    "        # Inputs: are tf-transformed materialized features\n",
    "        train_files=os.path.join(get_tft_train_output_dir(tft_run_id), TFT_TRAIN_FILE_PREFIX + '-00000-of-*'),\n",
    "        eval_files=os.path.join(get_tft_eval_output_dir(tft_run_id), TFT_EVAL_FILE_PREFIX + '-00000-of-*'),\n",
    "        schema_file=get_schema_file(),\n",
    "        # Output: dir for trained model\n",
    "        job_dir=get_tf_output_dir(tf_run_id),\n",
    "        tf_transform_dir=get_tft_train_output_dir(tft_run_id),\n",
    "        \n",
    "        # Output: dir for both the serving model and eval_model which will go into tfma\n",
    "        # evaluation\n",
    "        output_dir=get_tf_output_dir(tf_run_id),\n",
    "        train_steps=1000,\n",
    "        eval_steps=500,\n",
    "        num_layers=num_layers,\n",
    "        first_layer_size=first_layer_size,\n",
    "        scale_factor=scale_factor,\n",
    "        num_epochs=None,\n",
    "        train_batch_size=40,\n",
    "        eval_batch_size=40)\n",
    "\n",
    "    run_experiment(hparams)\n",
    "\n",
    "print('Done')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "LVqhulOSKAly"
   },
   "outputs": [],
   "source": [
    "run_local_experiment(tft_run_id=0,\n",
    "                     tf_run_id=0,\n",
    "                     num_layers=4,\n",
    "                     first_layer_size=100,\n",
    "                     scale_factor=0.7)\n",
    "print('Done')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "8Y6sRSPZKCjs"
   },
   "source": [
    "## Run TFMA to compute metrics\n",
    "For local analysis, TFMA offers a helper method ``tfma.run_model_analysis``"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "uOKlvwTLKH8T"
   },
   "source": [
    "#### You can also write your own custom pipeline if you want to perform extra transformations on the data before evaluation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "ZKCj7orAKKpq"
   },
   "outputs": [],
   "source": [
    "def run_tfma(slice_spec, tf_run_id, tfma_run_id, input_csv, schema_file, add_metrics_callbacks=None):\n",
    "    \"\"\"A simple wrapper function that runs tfma locally.\n",
    "    \n",
    "    A function that does extra transformations on the data and then run model analysis.\n",
    "    \n",
    "    Args:\n",
    "        slice_spec: The slicing spec for how to slice the data.\n",
    "        tf_run_id: An id to contruct the model directories with.\n",
    "        tfma_run_id: An id to construct output directories with.\n",
    "        input_csv: The evaluation data in csv format.\n",
    "        schema_file: The file holding a text-serialized schema for the input data.\n",
    "        add_metrics_callback: Optional list of callbacks for computing extra metrics.\n",
    "        \n",
    "    Returns:\n",
    "        An EvalResult that can be used with TFMA visualization functions.\n",
    "    \"\"\"\n",
    "    eval_model_base_dir = os.path.join(get_tf_output_dir(tf_run_id), EVAL_MODEL_DIR)\n",
    "    if eval_model_base_dir.startswith('hdfs'):\n",
    "        eval_model_dir = hopsfs.ls(eval_model_base_dir)[0]\n",
    "    else:\n",
    "        eval_model_dir = os.path.join(eval_model_base_dir, next(os.walk(eval_model_base_dir))[1][0])\n",
    "    eval_shared_model = tfma.default_eval_shared_model(\n",
    "        eval_saved_model_path=eval_model_dir,\n",
    "        add_metrics_callbacks=add_metrics_callbacks)\n",
    "    schema = taxi.read_schema(schema_file)\n",
    "    \n",
    "    print(eval_model_dir)\n",
    "    \n",
    "    display_only_data_location = input_csv\n",
    "    \n",
    "    with beam.Pipeline(options=options) as pipeline:\n",
    "        csv_coder = taxi.make_csv_coder(schema)\n",
    "        raw_data = (\n",
    "            pipeline\n",
    "            | 'ReadFromText' >> beam.io.ReadFromText(\n",
    "                input_csv,\n",
    "                coder=beam.coders.BytesCoder(),\n",
    "                skip_header_lines=True)\n",
    "            | 'ParseCSV' >> beam.Map(csv_coder.decode))\n",
    "        \n",
    "        # Examples must be in clean tf-example format.\n",
    "        coder = taxi.make_proto_coder(schema)\n",
    "        raw_data = (\n",
    "            raw_data\n",
    "            | 'ToSerializedTFExample' >> beam.Map(coder.encode))\n",
    "\n",
    "        _ = (raw_data\n",
    "             | 'ExtractEvaluateAndWriteResults' >>\n",
    "             tfma.ExtractEvaluateAndWriteResults(\n",
    "                 eval_shared_model=eval_shared_model,\n",
    "                 slice_spec=slice_spec,\n",
    "                 output_path=get_tfma_output_dir(tfma_run_id),\n",
    "                 display_only_data_location=input_csv))\n",
    "\n",
    "    return tfma.load_eval_result(output_path=get_tfma_output_dir(tfma_run_id))\n",
    "    \n",
    "print('Done')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "DC70sAOfKMo7"
   },
   "source": [
    "#### You can also compute metrics on slices of your data in TFMA. Slices can be specified using ``tfma.slicer.SingleSliceSpec``.\n",
    "\n",
    "Below are examples of how slices can be specified."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "n2JA6QRGKPCr"
   },
   "outputs": [],
   "source": [
    "# An empty slice spec means the overall slice, that is, the whole dataset.\n",
    "OVERALL_SLICE_SPEC = tfma.slicer.SingleSliceSpec()\n",
    "\n",
    "# Data can be sliced along a feature column\n",
    "# In this case, data is sliced along feature column trip_start_hour.\n",
    "FEATURE_COLUMN_SLICE_SPEC = tfma.slicer.SingleSliceSpec(columns=['trip_start_hour'])\n",
    "\n",
    "# Data can be sliced by crossing feature columns\n",
    "# In this case, slices are computed for trip_start_day x trip_start_month.\n",
    "FEATURE_COLUMN_CROSS_SPEC = tfma.slicer.SingleSliceSpec(columns=['trip_start_day', 'trip_start_month'])\n",
    "\n",
    "# Metrics can be computed for a particular feature value.\n",
    "# In this case, metrics is computed for all data where trip_start_hour is 12.\n",
    "FEATURE_VALUE_SPEC = tfma.slicer.SingleSliceSpec(features=[('trip_start_hour', 12)])\n",
    "\n",
    "# It is also possible to mix column cross and feature value cross.\n",
    "# In this case, data where trip_start_hour is 12 will be sliced by trip_start_day.\n",
    "COLUMN_CROSS_VALUE_SPEC = tfma.slicer.SingleSliceSpec(columns=['trip_start_day'], features=[('trip_start_hour', 12)])\n",
    "\n",
    "ALL_SPECS = [\n",
    "    OVERALL_SLICE_SPEC,\n",
    "    FEATURE_COLUMN_SLICE_SPEC, \n",
    "    FEATURE_COLUMN_CROSS_SPEC, \n",
    "    FEATURE_VALUE_SPEC, \n",
    "    COLUMN_CROSS_VALUE_SPEC    \n",
    "]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "As53x7HhKRVn"
   },
   "source": [
    "#### Let's run TFMA!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "9ffmUVhlKUBz"
   },
   "outputs": [],
   "source": [
    "tf.logging.set_verbosity(tf.logging.INFO)\n",
    "\n",
    "tfma_result_1 = run_tfma(input_csv=os.path.join(EVAL_DATA_DIR, 'data.csv'), \n",
    "                         tf_run_id=0, \n",
    "                         tfma_run_id=0,\n",
    "                         slice_spec=ALL_SPECS,\n",
    "                         schema_file=get_schema_file())\n",
    "print('Done')\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "XdNnyYg0KVwi"
   },
   "source": [
    "## Visualization: Slicing Metrics\n",
    "\n",
    "To see the slices, either use the name of the column (by setting slicing_column) or provide a tfma.slicer.SingleSliceSpec (by setting slicing_spec). If neither is provided, the overall will be displayed.\n",
    "\n",
    "The default visualization is **slice overview** when the number of slices is small. It shows the value of a metric for each slice sorted by the another metric. It is also possible to set a threshold to filter out slices with smaller weights.\n",
    "\n",
    "This view also supports **metrics histogram** as an alternative visualization. It is also the defautl view when the number of slices is large. The results will be divided into buckets and the number of slices / total weights / both can be visualized. Slices with small weights can be fitlered out by setting the threshold. Further filtering can be applied by dragging the grey band. To reset the range, double click the band. Filtering can be used to remove outliers in the visualization and the metrics table below."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "VtptlAgnKXt6"
   },
   "outputs": [],
   "source": [
    "# Show data sliced along feature column trip_start_hour.\n",
    "tfma.view.render_slicing_metrics(tfma_result_1, slicing_column='trip_start_hour')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "J2CGlSfHKZ71"
   },
   "outputs": [],
   "source": [
    "# Show metrics sliced by COLUMN_CROSS_VALUE_SPEC above.\n",
    "tfma.view.render_slicing_metrics(tfma_result_1, slicing_spec=COLUMN_CROSS_VALUE_SPEC)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "6lkVP_RQKbgz"
   },
   "outputs": [],
   "source": [
    "# Show overall metrics.\n",
    "tfma.view.render_slicing_metrics(tfma_result_1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "585hmemUKc6L"
   },
   "source": [
    "## Visualization: Plots\n",
    "\n",
    "TFMA offers a number of built-in plots. To see them, add them to ``add_metrics_callbacks``"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "rSUDOrnYKemd"
   },
   "outputs": [],
   "source": [
    "# Error: RuntimeError: Pipeline BeamApp-jupyter-0808083834-f43977f4_49ae6b5a-bb07-4683-b3d8-0534920c540c \n",
    "# failed in state FAILED: java.io.IOException: The record exceeds the maximum size of a sort\n",
    "# buffer (current maximum: 132579328 bytes).\n",
    "# WORKED with TaskManager 20GB memory but then got error \"DataLossError: corrupted record at 11483790\"\n",
    "tf.logging.set_verbosity(tf.logging.INFO)\n",
    "\n",
    "tfma_vis = run_tfma(input_csv=os.path.join(EVAL_DATA_DIR, 'data.csv'), \n",
    "                   tf_run_id=0,\n",
    "                   tfma_run_id='vis',\n",
    "                   slice_spec=ALL_SPECS,\n",
    "                   schema_file=get_schema_file(),\n",
    "                   add_metrics_callbacks=[\n",
    "                       # calibration_plot_and_prediction_histogram computes calibration plot and prediction\n",
    "                       # distribution at different thresholds.\n",
    "                       tfma.post_export_metrics.calibration_plot_and_prediction_histogram(),\n",
    "                       # auc_plots enables precision-recall curve and ROC visualization at different thresholds.\n",
    "                       tfma.post_export_metrics.auc_plots()\n",
    "                   ])\n",
    "\n",
    "print('Done')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "XCpJn307KhHZ"
   },
   "source": [
    "Plots must be visualized for an individual slice. To specify a slice, use ``tfma.slicer.SingleSliceSpec``.\n",
    "\n",
    "In the example below, we are using ``tfma.slicer.SingleSliceSpec(features=[('trip_start_hour', 1)])`` to specify the slice where trip_start_hour is 1.\n",
    "\n",
    "Plots are interactive:\n",
    "- Drag to pan\n",
    "- Scroll to zoom\n",
    "- Right click to reset the view\n",
    "\n",
    "Simply hover over the desired data point to see more details."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "8ajak6hvKjvz"
   },
   "outputs": [],
   "source": [
    "#%local\n",
    "#tfma.view.render_plot(tfma_vis, tfma.slicer.SingleSliceSpec(features=[('trip_start_hour', 1)]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "JEQxCwlmKlMS"
   },
   "source": [
    "#### Custom metrics\n",
    "\n",
    "In addition to plots, it is also possible to compute additional metrics not present at export time or custom metrics metrics using ``add_metrics_callbacks``.\n",
    "\n",
    "All metrics in ``tf.metrics`` are supported in the callback and can be used to compose other metrics:\n",
    "https://www.tensorflow.org/api_docs/python/tf/metrics\n",
    "\n",
    "In the cells below, false negative rate is computed as an example."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "G6YlvQLrKmvz"
   },
   "outputs": [],
   "source": [
    "# Defines a callback that adds FNR to the result.\n",
    "def add_fnr_for_threshold(threshold):\n",
    "    import tensorflow as tf\n",
    "    def _add_fnr_callback(features_dict, predictions_dict, labels_dict):\n",
    "        metric_ops = {}\n",
    "        prediction_tensor = tf.cast(\n",
    "            predictions_dict.get(tf.contrib.learn.PredictionKey.LOGISTIC), tf.float64)\n",
    "        fn_value_op, fn_update_op = tf.metrics.false_negatives_at_thresholds(tf.squeeze(labels_dict), \n",
    "                                                                             tf.squeeze(prediction_tensor), \n",
    "                                                                             [threshold])\n",
    "        tp_value_op, tp_update_op = tf.metrics.true_positives_at_thresholds(tf.squeeze(labels_dict), \n",
    "                                                                            tf.squeeze(prediction_tensor), \n",
    "                                                                            [threshold])\n",
    "        fnr = fn_value_op[0] / (fn_value_op[0] + tp_value_op[0])\n",
    "        metric_ops['FNR@' + str(threshold)] = (fnr, tf.group(fn_update_op, tp_update_op)) \n",
    "        return metric_ops\n",
    "    \n",
    "    return _add_fnr_callback"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "9pWXzEMvKpm6"
   },
   "outputs": [],
   "source": [
    "tf.logging.set_verbosity(tf.logging.INFO)\n",
    "\n",
    "tfma_fnr = run_tfma(input_csv=os.path.join(EVAL_DATA_DIR, 'data.csv'), \n",
    "                    tf_run_id=0,\n",
    "                    tfma_run_id='fnr',\n",
    "                    slice_spec=ALL_SPECS,\n",
    "                    schema_file=get_schema_file(),\n",
    "                    add_metrics_callbacks=[\n",
    "                        # Simply add the call here.\n",
    "                        add_fnr_for_threshold(0.75)\n",
    "                    ])\n",
    "tfma.view.render_slicing_metrics(tfma_fnr, slicing_spec=FEATURE_COLUMN_SLICE_SPEC)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "brWQBOnuKrJq"
   },
   "source": [
    "## Visualization: Time Series\n",
    "\n",
    "It is important to track how your model is doing over time. TFMA offers two modes to show your model performs over time.\n",
    "\n",
    "**Multiple model analysis** shows how model perfoms from one version to another. This is useful early on to see how the addition of new features, change in modeling technique, etc, affects the performance. TFMA offers a convenient method."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "JtHDM1sPKvBx"
   },
   "source": [
    "**Multiple data analysis** shows how a model perfoms under different evaluation data set. This is useful to ensure that model performance does not degrade over time. TFMA offer a conveneient method."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "OsfW6L2QKyGq"
   },
   "source": [
    "It is also possible to compose a time series manually."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "cxzmxsJyKzxC"
   },
   "outputs": [],
   "source": [
    "# Create different models.\n",
    "\n",
    "# Run some experiments with different hidden layer configurations.\n",
    "run_local_experiment(tft_run_id=0,\n",
    "                     tf_run_id=1,\n",
    "                     num_layers=3,\n",
    "                     first_layer_size=200,\n",
    "                     scale_factor=0.7)\n",
    "\n",
    "run_local_experiment(tft_run_id=0,\n",
    "                     tf_run_id=2,\n",
    "                     num_layers=4,\n",
    "                     first_layer_size=240,\n",
    "                     scale_factor=0.5)\n",
    "\n",
    "print('Done')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "SLBflNxyK14D"
   },
   "outputs": [],
   "source": [
    "tfma_result_2 = run_tfma(input_csv=os.path.join(EVAL_DATA_DIR, 'data.csv'), \n",
    "                         tf_run_id=1, \n",
    "                         tfma_run_id=2,                         \n",
    "                         slice_spec=ALL_SPECS,\n",
    "                         schema_file=get_schema_file())\n",
    "\n",
    "tfma_result_3 = run_tfma(input_csv=os.path.join(EVAL_DATA_DIR, 'data.csv'), \n",
    "                         tf_run_id=2, \n",
    "                         tfma_run_id=3,\n",
    "                         slice_spec=ALL_SPECS,\n",
    "                         schema_file=get_schema_file())\n",
    "print('Done')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "N5ZVlsu3K3s6"
   },
   "source": [
    "Like plots, time series view must visualized for a slice too.\n",
    "\n",
    "In the example below, we are showing the overall slice.\n",
    "\n",
    "Select a metric to see its time series graph. Hover over each data point to get more details."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "d52jgg8WK51T"
   },
   "outputs": [],
   "source": [
    "eval_results = tfma.make_eval_results([tfma_result_1, tfma_result_2, tfma_result_3], \n",
    "                                      tfma.constants.MODEL_CENTRIC_MODE)\n",
    "tfma.view.render_time_series(eval_results, OVERALL_SLICE_SPEC)\n",
    "print(\"Done\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "T9o8bZWRK8Fi"
   },
   "source": [
    "Serialized results can also be used to construct a time series. Thus, there is no need to re-run TFMA for models already evaluated for a long running pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "zYyVJEWWK90c"
   },
   "outputs": [],
   "source": [
    "# Visualize the results in a Time Series. In this case, we are showing the slice specified.\n",
    "eval_results_from_disk = tfma.load_eval_results([get_tfma_output_dir(1), \n",
    "                                                 get_tfma_output_dir(2), \n",
    "                                                 get_tfma_output_dir(3)], \n",
    "                                                tfma.constants.MODEL_CENTRIC_MODE)\n",
    "tfma.view.render_time_series(eval_results_from_disk, FEATURE_VALUE_SPEC)\n",
    "print(\"Done\")"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "collapsed_sections": [],
   "name": "chicago_taxi_tfma_local_playground.ipynb",
   "provenance": [],
   "version": "0.3.2"
  },
  "kernelspec": {
   "display_name": "python-flink_tutorial__meb10000",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.16"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
